package com.simafei.flow.web.service.impl;

import cn.hutool.core.collection.CollectionUtil;
import com.simafei.flow.core.Edge;
import com.simafei.flow.core.EdgeResult;
import com.simafei.flow.core.ExecutionContext;
import com.simafei.flow.core.ExecutionListener;
import com.simafei.flow.core.Flow;
import com.simafei.flow.core.FlowResult;
import com.simafei.flow.core.Node;
import com.simafei.flow.core.NodeResult;
import com.simafei.flow.core.common.ExecStatus;
import com.simafei.flow.core.common.FlowStatus;
import com.simafei.flow.core.json.JsonUtils;
import com.simafei.flow.web.entity.EdgeResultPO;
import com.simafei.flow.web.entity.FlowResultPO;
import com.simafei.flow.web.entity.NodeResultPO;
import com.simafei.flow.web.service.IEdgeResultService;
import com.simafei.flow.web.service.IFlowResultService;
import com.simafei.flow.web.service.INodeResultService;
import com.simafei.flow.web.transfer.EdgeResultTransfer;
import com.simafei.flow.web.transfer.NodeResultTransfer;
import com.simafei.flow.web.util.SseHelper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.Objects;
import java.util.stream.Collectors;

/**
 * @author fengpengju
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class FlowExecutionListener implements ExecutionListener {

    private final INodeResultService nodeResultService;

    private final IEdgeResultService edgeResultService;

    private final IFlowResultService flowResultService;

    private final SseHelper sseHelper;

    @Override
    public void afterNodeExecute(ExecutionContext context, Node node, NodeResult result) {
        NodeResultPO po = new NodeResultPO();
        po.setFlowId(context.getFlowId());
        po.setExecId(context.getExecId());
        po.setNodeId(node.getId());
        po.setParentId(result.getParentId());
        po.setNodeType(node.getNodeType().name());
        if (Objects.nonNull(result.getCause())) {
            po.setErrorMsg(result.getCause().getMessage());
        }
        po.setSuccess(result.isSuccess());
        po.setExecParams(JsonUtils.toJsonString(result.getInputParams()));
        po.setExecResult(JsonUtils.toJsonString(result.getResults()));
        po.setStartTime(result.getEndTime());
        po.setEndTime(result.getEndTime());
        po.setExecStatus(result.getExecStatus().name());
        log.debug("nodeResult:{}", result);
        nodeResultService.replace(po);

        sseHelper.send(context.getExecId(), NodeResultTransfer.INSTANCE.toResp(po));
    }

    @Override
    public void afterEdgeExecute(ExecutionContext context, Edge edge, EdgeResult result) {
        EdgeResultPO po = new EdgeResultPO();
        po.setFlowId(context.getFlowId());
        po.setExecId(context.getExecId());
        po.setEdgeId(edge.getId());
        po.setPass(result.isPass());
        po.setSpendTime(result.getSpendTime());
        po.setExecParams(JsonUtils.toJsonString(result.getInputParams()));
        po.setFilteredParams(JsonUtils.toJsonString(result.getFilteredParams()));
        log.debug("edgeResult:{}", result);
        edgeResultService.save(po);

        sseHelper.send(context.getExecId(), EdgeResultTransfer.INSTANCE.toResp(po));
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void afterFlowExecute(ExecutionContext context, Flow flow, FlowResult result) {
        sseHelper.complete(context.getExecId());

        FlowResultPO po = new FlowResultPO();
        po.setExecId(context.getExecId());
        po.setFlowId(context.getFlowId());
        po.setExecParam(JsonUtils.toJsonString(result.getExecParam()));
        po.setExecResult(JsonUtils.toJsonString(result.getExecResults()));
        po.setExecStatus(result.getExecStatus().name());
        po.setFlowStatus(result.getFlowStatus().name());
        po.setTestMode(context.isTest());
        po.setSpendTime(result.getSpendTime());
        if (CollectionUtil.isNotEmpty(result.getCauses())) {
            po.setErrorMsg(result.getCauses().stream().map(Throwable::getMessage).collect(Collectors.joining("\n")));
        }
        log.debug("flowResult:{}", result);
        save(po);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void onFlowError(ExecutionContext context, Flow flow, Throwable throwable) {
        log.error("flow id={} execution error", flow.getId(), throwable);

        FlowResultPO po = new FlowResultPO();
        po.setExecId(context.getExecId());
        po.setFlowId(context.getFlowId());
        po.setExecParam(JsonUtils.toJsonString(context.getInputParams()));
        po.setExecResult("[]");
        po.setExecStatus(ExecStatus.FAILED.name());
        po.setFlowStatus(FlowStatus.REJECT.name());
        po.setTestMode(context.isTest());
        po.setSpendTime(System.currentTimeMillis() - context.getStartTime());
        po.setErrorMsg(throwable.getMessage());
        save(po);

        sseHelper.complete(context.getExecId());
    }

    private void save(FlowResultPO flowResult) {
        flowResultService.replace(flowResult);
    }
}
